{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ur8xi4C7S06n"
      },
      "outputs": [],
      "source": [
        "# Copyright 2024 Google LLC\n",
        "#\n",
        "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "#     https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JAPoU8Sm5E6e"
      },
      "source": [
        "# Monitor batch prediction with Gemini API\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td style=\"text-align: center\">\n",
        "    <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\">\n",
        "      <img width=\"32px\" src=\"https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg\" alt=\"Google Colaboratory logo\"><br> Open in Colab\n",
        "    </a>\n",
        "  </td>\n",
        "  <td style=\"text-align: center\">\n",
        "    <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fbatch-prediction%2Fmonitor_batch_prediction_gemini_api.ipynb\">\n",
        "      <img width=\"32px\" src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\" alt=\"Google Cloud Colab Enterprise logo\"><br> Open in Colab Enterprise\n",
        "    </a>\n",
        "  </td>\n",
        "  <td style=\"text-align: center\">\n",
        "    <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\">\n",
        "      <img src=\"https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg\" alt=\"Vertex AI logo\"><br> Open in Vertex AI Workbench\n",
        "    </a>\n",
        "  </td>\n",
        "  <td style=\"text-align: center\">\n",
        "    <a href=\"https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\">\n",
        "      <img width=\"32px\" src=\"https://raw.githubusercontent.com/primer/octicons/refs/heads/main/icons/mark-github-24.svg\" alt=\"GitHub logo\"><br> View on GitHub\n",
        "    </a>\n",
        "  </td>\n",
        "</table>\n",
        "\n",
        "<div style=\"clear: both;\"></div>\n",
        "\n",
        "<b>Share to:</b>\n",
        "\n",
        "<a href=\"https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\" target=\"_blank\">\n",
        "  <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg\" alt=\"LinkedIn logo\">\n",
        "</a>\n",
        "\n",
        "<a href=\"https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\" target=\"_blank\">\n",
        "  <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg\" alt=\"Bluesky logo\">\n",
        "</a>\n",
        "\n",
        "<a href=\"https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\" target=\"_blank\">\n",
        "  <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg\" alt=\"X logo\">\n",
        "</a>\n",
        "\n",
        "<a href=\"https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\" target=\"_blank\">\n",
        "  <img width=\"20px\" src=\"https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png\" alt=\"Reddit logo\">\n",
        "</a>\n",
        "\n",
        "<a href=\"https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/batch-prediction/monitor_batch_prediction_gemini_api.ipynb\" target=\"_blank\">\n",
        "  <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg\" alt=\"Facebook logo\">\n",
        "</a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "84f0f73a0f76"
      },
      "source": [
        "| | |\n",
        "|-|-|\n",
        "| Author(s) |  [Ivan Nardini](https://github.com/your-github-username/) |"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tvgnzT1CKxrO"
      },
      "source": [
        "## Overview\n",
        "\n",
        "While the Gemini API allows asynchronous batch predictions to Cloud Storage or BigQuery, it currently lacks built-in completion notifications. This notebook addresses this gap by leveraging Vertex AI Pipelines to manage the workflow and track job status.\n",
        "\n",
        "\n",
        "### Objectives\n",
        "\n",
        "This tutorial demonstrates how to orchestrate and monitor Gemini batch prediction jobs using Vertex AI Pipelines.\n",
        "\n",
        "Specifically, you will learn how to:\n",
        "\n",
        "1. **Prepare Batch Inputs and Output Location:** Set up your data in Cloud Storage and designate a Cloud Storage bucket for the model's output.\n",
        "2. **Build a Vertex AI Pipeline for Batch Prediction:** Define a pipeline that encapsulates the batch prediction job.\n",
        "3. **Submit a Vertex AI Pipeline Job:** Execute the defined pipeline, triggering the batch prediction process on the Gemini model.  \n",
        "4. **Retrieve Batch Prediction Results:**  Access and process the predictions generated by the Gemini model once the pipeline completes."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "61RBz8LLbxCR"
      },
      "source": [
        "## Get started"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "No17Cw5hgx12"
      },
      "source": [
        "### Install Vertex AI SDK and other required packages\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "tFy3H3aPgx12"
      },
      "outputs": [],
      "source": [
        "%pip install --upgrade --user --quiet google-cloud-aiplatform google-cloud-bigquery kfp google-cloud-pipeline-components"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dmWOrTJ3gx13"
      },
      "source": [
        "### Authenticate your notebook environment (Colab only)\n",
        "\n",
        "If you're running this notebook on Google Colab, run the cell below to authenticate your environment."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "NyKGtVQjgx13"
      },
      "outputs": [],
      "source": [
        "import sys\n",
        "\n",
        "if \"google.colab\" in sys.modules:\n",
        "    from google.colab import auth\n",
        "\n",
        "    auth.authenticate_user()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Dy5VO78rzX8c"
      },
      "source": [
        "### Requirements"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "R8Zm9y0hxU5O"
      },
      "source": [
        "#### Set Project ID and Location\n",
        "\n",
        "To get started using Vertex AI, you must have an existing Google Cloud project and [enable these APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,artifactregistry.googleapis.com).\n",
        "\n",
        "Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "jWfqWIVrfJtA"
      },
      "outputs": [],
      "source": [
        "# Use the environment variable if the user doesn't provide Project ID.\n",
        "import os\n",
        "\n",
        "# fmt: off\n",
        "PROJECT_ID = \"[your-project-id]\"  # @param {type: \"string\", placeholder: \"[your-project-id]\", isTemplate: true}\n",
        "# fmt: on\n",
        "\n",
        "if not PROJECT_ID or PROJECT_ID == \"[your-project-id]\":\n",
        "    PROJECT_ID = str(os.environ.get(\"GOOGLE_CLOUD_PROJECT\"))\n",
        "\n",
        "PROJECT_NUMBER = !gcloud projects describe {PROJECT_ID} --format=\"get(projectNumber)\"[0]\n",
        "PROJECT_NUMBER = PROJECT_NUMBER[0]\n",
        "\n",
        "LOCATION = os.environ.get(\"GOOGLE_CLOUD_REGION\", \"us-central1\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "20D88NRAfJtA"
      },
      "source": [
        "#### Set and create a Cloud Storage bucket\n",
        "\n",
        "Create a storage bucket to store intermediate artifacts such as models."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "LnzVwUEnfJtA"
      },
      "outputs": [],
      "source": [
        "# fmt: off\n",
        "BUCKET_NAME = \"your-bucket-name-{PROJECT_ID}-unique\"  # @param {type:\"string\"}\n",
        "\n",
        "BUCKET_URI = f\"gs://{BUCKET_NAME}\"  # @param {type:\"string\"}\n",
        "# fmt: on"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Ps8aCR_tfJtA"
      },
      "outputs": [],
      "source": [
        "! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "set_service_account"
      },
      "source": [
        "#### Set Service Account and permissions\n",
        "\n",
        "You will need to have the following IAM roles set:\n",
        "\n",
        "- Vertex AI User (roles/aiplatform.user)\n",
        "- BigQuery Data Editor (roles/bigquery.dataEditor)\n",
        "- Storage Object Admin (roles/storage.objectAdmin)\n",
        "\n",
        "For more information about granting roles, see [Manage access](https://cloud.google.com/iam/docs/granting-changing-revoking-access).\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "VX9tpdtuQI5L"
      },
      "source": [
        "> If you run following commands using Vertex AI Workbench, run directly in the terminal.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ssUJJqXJJHgC"
      },
      "outputs": [],
      "source": [
        "SERVICE_ACCOUNT = f\"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "wqOHg5aid6HP"
      },
      "outputs": [],
      "source": [
        "for role in ['aiplatform.user', 'storage.objectAdmin', 'bigquery.dataEditor']:\n",
        "\n",
        "    ! gcloud projects add-iam-policy-binding {PROJECT_ID} \\\n",
        "      --member=serviceAccount:{SERVICE_ACCOUNT} \\\n",
        "      --role=roles/{role} --condition=None"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JW9GEXN4Zjai"
      },
      "source": [
        "### Set and create a BigQuery table\n",
        "\n",
        "Create a BigQuery table to store predictions."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "D6m41CtmZs3Q"
      },
      "outputs": [],
      "source": [
        "from datetime import datetime\n",
        "\n",
        "from google.cloud import bigquery\n",
        "\n",
        "def create_bq_table(\n",
        "    dataset_id: str,\n",
        "    project_id: str = PROJECT_ID,\n",
        "    location: str = LOCATION,\n",
        ") -> tuple[str, str]:\n",
        "    \"\"\"Creates a BigQuery dataset and generates a table URI for batch predictions.\"\"\"\n",
        "    # Initialize BigQuery client\n",
        "    bq_client = bigquery.Client(project=project_id, location=location)\n",
        "\n",
        "    # Create dataset reference\n",
        "    dataset_path = f\"{project_id}.{dataset_id}\"\n",
        "    dataset = bigquery.Dataset(dataset_path)\n",
        "    dataset.location = location\n",
        "\n",
        "    # Create or get existing dataset\n",
        "    dataset = bq_client.create_dataset(dataset, exists_ok=True, timeout=30)\n",
        "\n",
        "    # Generate table URI with timestamp\n",
        "    timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n",
        "    table_id = f\"prediction_result_{timestamp}\"\n",
        "    table_uri = f\"bq://{project_id}.{dataset_id}.{table_id}\"\n",
        "\n",
        "    return table_uri"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Iln8vBRoaS__"
      },
      "outputs": [],
      "source": [
        "# fmt: off\n",
        "BQ_DATASET = \"gen_ai_batch_prediction\"  # @param {type:\"string\"}\n",
        "# fmt: on\n",
        "OUTPUT_TABLE_URI = create_bq_table(dataset_id=BQ_DATASET)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EjebjdiNxe_D"
      },
      "source": [
        "### Initiate Vertex AI SDK"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "GReulMr0xjZs"
      },
      "outputs": [],
      "source": [
        "import vertexai\n",
        "\n",
        "vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5303c05f7aa6"
      },
      "source": [
        "### Import libraries"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "6fc324893334"
      },
      "outputs": [],
      "source": [
        "from typing import NamedTuple\n",
        "\n",
        "from google.cloud import aiplatform\n",
        "from google_cloud_pipeline_components.types.artifact_types import VertexDataset\n",
        "from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp\n",
        "from google_cloud_pipeline_components.v1.vertex_notification_email import (\n",
        "    VertexNotificationEmailOp,\n",
        ")\n",
        "from kfp import compiler, dsl\n",
        "from kfp.dsl import Markdown, Output, component"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1gWay_5fbIC_"
      },
      "source": [
        "### Set constants"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Ljbx9IXdbKD4"
      },
      "outputs": [],
      "source": [
        "INPUT_TABLE_URI = (\n",
        "    \"bq://storage-samples.generative_ai.batch_requests_for_multimodal_input_2\"\n",
        ")\n",
        "MODEL_ID = \"gemini-2.0-flash\"  # @param {type:\"string\", isTemplate: true}\n",
        "# fmt: off\n",
        "RECIPIENTS = [\"your-email@provider.com\"]  # @param {type: \"string\", placeholder: \"[your-email@provider.com]\", isTemplate: true}\n",
        "# fmt: on\n",
        "PIPELINE_ROOT = f\"{BUCKET_URI}/genai-prediction-pipeline\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xOz28mO6KSKr"
      },
      "source": [
        "### Build the Batch prediction component\n",
        "\n",
        "Define a lightweight custom Kubeflow Pipelines component for running batch prediction jobs using Vertex AI's Generative Models.\n",
        "\n",
        "It takes an input BigQuery table, submits it to a specified Generative Model for batch prediction, and outputs the resulting predictions to a specified output BigQuery table location.\n",
        "\n",
        "The component monitors the job's progress and logs relevant information. Upon successful completion, it returns the URI of the output dataset.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Mq4q9DLjLKcc"
      },
      "outputs": [],
      "source": [
        "@component(\n",
        "    base_image=\"python:3.10\",\n",
        "    packages_to_install=[\"google-cloud-aiplatform\", \"google_cloud_pipeline_components\"],\n",
        ")\n",
        "def GenAIModelBatchPredictOp(\n",
        "    input_bq_table: str,\n",
        "    output_bq_table: str,\n",
        "    model_id: str,\n",
        "    project: str,\n",
        "    location: str,\n",
        "    output_dataset_artifact: Output[VertexDataset],\n",
        ") -> NamedTuple(\"outputs\", dataset_uri=str):\n",
        "    import logging\n",
        "    import sys\n",
        "    import time\n",
        "\n",
        "    import vertexai\n",
        "    from vertexai.batch_prediction import BatchPredictionJob\n",
        "    from vertexai.generative_models import GenerativeModel\n",
        "\n",
        "    # Configure logging\n",
        "    logging.basicConfig(\n",
        "        level=logging.INFO,\n",
        "        format=\"%(asctime)s - %(levelname)s - %(message)s\",\n",
        "        datefmt=\"%Y-%m-%d %H:%M:%S\",\n",
        "    )\n",
        "    logger = logging.getLogger(__name__)\n",
        "\n",
        "    # Initiate Vertex AI session\n",
        "    logger.info(\n",
        "        f\"Initializing Vertex AI session with project: {project}, location: {location}\"\n",
        "    )\n",
        "    vertexai.init(project=project, location=location)\n",
        "\n",
        "    # Initiate the model\n",
        "    logger.info(f\"Initializing GenerativeModel with model_id: {model_id}\")\n",
        "    model = GenerativeModel(model_id)\n",
        "\n",
        "    # Send the batch prediction request\n",
        "    logger.info(f\"Submitting batch prediction job - Input table: {input_bq_table}\")\n",
        "    logger.info(f\"Output will be stored at: {output_bq_table}\")\n",
        "    job = BatchPredictionJob.submit(\n",
        "        source_model=model_id,\n",
        "        input_dataset=input_bq_table,\n",
        "        output_uri_prefix=output_bq_table,\n",
        "    )\n",
        "\n",
        "    # Monitor the job\n",
        "    start_time = time.time()\n",
        "    logger.info(\"Starting job monitoring...\")\n",
        "    while not job.has_ended:\n",
        "        elapsed_time = time.time() - start_time\n",
        "        logger.info(f\"Job running... Elapsed time: {elapsed_time:.2f} seconds\")\n",
        "        time.sleep(60)\n",
        "        job.refresh()\n",
        "\n",
        "    # Check if the job succeeds\n",
        "    if job.has_succeeded:\n",
        "        total_time = time.time() - start_time\n",
        "        logger.info(f\"Job completed successfully in {total_time:.2f} seconds!\")\n",
        "        logger.info(f\"Output dataset available at: {output_bq_table}\")\n",
        "    else:\n",
        "        logger.error(f\"Job failed with error: {job.error}\")\n",
        "        sys.exit(1)\n",
        "\n",
        "    output_bq_table = job.output_location\n",
        "    component_outputs = NamedTuple(\"outputs\", dataset_uri=str)\n",
        "    logger.info(f\"Returning component output with dataset_uri: {output_bq_table}\")\n",
        "    return component_outputs(output_bq_table)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "A2tCfmrKSHEd"
      },
      "source": [
        "### Build a component to visualize the prediction table\n",
        "\n",
        "Define a lightweight custom Kubeflow Pipelines component for visualizing a prediction sample.\n",
        "\n",
        "The component takes the BigQuery table name, sample size, project, and location as inputs and outputs a markdown file. It uses the BigQuery Python client to query the table, pandas to process the data, and incorporates logging for visualization.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "VYIheA4ySGfO"
      },
      "outputs": [],
      "source": [
        "@component(\n",
        "    base_image=\"python:3.10\",\n",
        "    packages_to_install=[\n",
        "        \"google-cloud-bigquery[pandas]\",\n",
        "        \"google_cloud_pipeline_components\",\n",
        "    ],\n",
        ")\n",
        "def VisualizeBatchPredictionTable(\n",
        "    output_bq_table: str,\n",
        "    sample_size: int,\n",
        "    project: str,\n",
        "    location: str,\n",
        "    output_markdown_table: Output[Markdown],\n",
        "):\n",
        "    import logging\n",
        "    import sys\n",
        "\n",
        "    import pandas as pd\n",
        "    from google.cloud import bigquery\n",
        "\n",
        "    # Configure logging\n",
        "    logging.basicConfig(\n",
        "        level=logging.INFO,\n",
        "        format=\"%(asctime)s - %(levelname)s - %(message)s\",\n",
        "        datefmt=\"%Y-%m-%d %H:%M:%S\",\n",
        "    )\n",
        "    logger = logging.getLogger(__name__)\n",
        "\n",
        "    # Helper to extract only request and response text from the records\n",
        "    def extract_text(record):\n",
        "        try:\n",
        "            request_text = record[\"request\"][\"contents\"][0][\"parts\"][0][\"text\"]\n",
        "            response_text = record[\"response\"][\"candidates\"][0][\"content\"][\"parts\"][0][\n",
        "                \"text\"\n",
        "            ]\n",
        "            return {\"Request\": request_text, \"Response\": response_text}\n",
        "        except (KeyError, IndexError) as e:\n",
        "            logger.warning(f\"Could not extract text from record: {e}\")\n",
        "            return {\"Request\": \"\", \"Response\": \"\"}\n",
        "\n",
        "    # Helper function to escape pipe characters and handle multiline content\n",
        "    def escape_cell(val):\n",
        "        if val is None:\n",
        "            return \"\"\n",
        "        val_str = str(val)\n",
        "        # Escape pipe characters\n",
        "        val_str = val_str.replace(\"|\", \"\\\\|\")\n",
        "        # Replace newlines with <br>\n",
        "        val_str = val_str.replace(\"\\n\", \"<br>\")\n",
        "        return val_str\n",
        "\n",
        "    # Initialize BigQuery client\n",
        "    logger.info(f\"Initializing BigQuery client for project: {project} in {location}\")\n",
        "    client = bigquery.Client(project=project, location=location)\n",
        "\n",
        "    # Construct and execute query\n",
        "    output_bq_table = output_bq_table.replace(\"bq://\", \"\")\n",
        "    query = f\"\"\"\n",
        "        SELECT *\n",
        "        FROM `{output_bq_table}`\n",
        "        LIMIT {sample_size}\n",
        "    \"\"\"\n",
        "    logger.info(f\"Executing query on dataset: {output_bq_table}\")\n",
        "    logger.info(f\"Sampling {sample_size} rows\")\n",
        "    df = client.query(query).to_dataframe()\n",
        "    logger.info(f\"Query returned {len(df)} rows and {len(df.columns)} columns\")\n",
        "    if df.empty:\n",
        "        logger.error(\"No data found in table\")\n",
        "        sys.exit(1)\n",
        "\n",
        "    # Process DataFrame to extract texts\n",
        "    logger.info(\"Extracting request and response texts\")\n",
        "    processed_records = [extract_text(record) for record in df.to_dict(\"records\")]\n",
        "    processed_df = pd.DataFrame(processed_records)\n",
        "\n",
        "    # Format markdown table with proper escaping\n",
        "    logger.info(\"Converting DataFrame to markdown format\")\n",
        "    headers = \"|\" + \"|\".join(str(col) for col in processed_df.columns) + \"|\"\n",
        "    separator = \"|\" + \"|\".join(\"---\" for _ in processed_df.columns) + \"|\"\n",
        "\n",
        "    rows = []\n",
        "    for idx, row in processed_df.iterrows():\n",
        "        row_str = \"|\" + \"|\".join(escape_cell(val) for val in row) + \"|\"\n",
        "        rows.append(row_str)\n",
        "\n",
        "    # Combine all parts and write to file\n",
        "    markdown_table = \"\\n\".join([headers, separator] + rows)\n",
        "    logger.info(f\"Writing markdown table to: {output_markdown_table.path}\")\n",
        "    with open(output_markdown_table.path, \"w\") as f:\n",
        "        f.write(markdown_table)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Y7MtOyANNuey"
      },
      "source": [
        "### Define your workflow using Kubeflow Pipelines DSL package\n",
        "\n",
        "The kfp.dsl package contains the domain-specific language (DSL) that you can use to build the pipeline for running Gen AI batch prediction workflow."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "RqgkHVXhN0ik"
      },
      "outputs": [],
      "source": [
        "@dsl.pipeline(name=\"genai-batch-prediction-pipeline\")\n",
        "def pipeline(\n",
        "    input_dataset_name: str,\n",
        "    input_bq_table: str,\n",
        "    output_bq_table: str,\n",
        "    model_id: str,\n",
        "    sample_size: int,\n",
        "    project: str = PROJECT_ID,\n",
        "    location: str = LOCATION,\n",
        "    recipients: list = RECIPIENTS,\n",
        "):\n",
        "    notify_email_task = VertexNotificationEmailOp(recipients=recipients)\n",
        "\n",
        "    create_input_dataset_task = TabularDatasetCreateOp(\n",
        "        display_name=input_dataset_name,\n",
        "        bq_source=input_bq_table,\n",
        "        project=project,\n",
        "        location=location,\n",
        "    ).set_display_name(\"Create input dataset\")\n",
        "\n",
        "    with dsl.ExitHandler(notify_email_task, name=\"Notification handler\"):\n",
        "        run_batch_prediction_task = (\n",
        "            GenAIModelBatchPredictOp(\n",
        "                input_bq_table=input_bq_table,\n",
        "                output_bq_table=output_bq_table,\n",
        "                model_id=model_id,\n",
        "                project=project,\n",
        "                location=location,\n",
        "            )\n",
        "            .after(create_input_dataset_task)\n",
        "            .set_display_name(\"Run Gen AI Batch Prediction job\")\n",
        "        )\n",
        "\n",
        "        visualize_prediction_task = (\n",
        "            VisualizeBatchPredictionTable(\n",
        "                output_bq_table=output_bq_table,\n",
        "                sample_size=sample_size,\n",
        "                project=project,\n",
        "                location=location,\n",
        "            )\n",
        "            .after(run_batch_prediction_task)\n",
        "            .set_display_name(\"Visualize Gen AI Predictions\")\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "D0c4TIAMZL89"
      },
      "source": [
        "### Compile your pipeline into a YAML file\n",
        "\n",
        "After the workflow of your pipeline is defined, compile the pipeline into YAML format for executing your pipeline on Vertex AI Pipelines."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Km24n9Y4Y_fY"
      },
      "outputs": [],
      "source": [
        "compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.yaml\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "s7GNLpeaZrDs"
      },
      "source": [
        "#### Submit your pipeline run\n",
        "\n",
        "After compiling your pipeline, use the Vertex AI Python client to submit and run your pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xTxkxDBKZC-V"
      },
      "outputs": [],
      "source": [
        "parameter_values = {\n",
        "    \"input_dataset_name\": \"genai_input_prediction_dataset\",\n",
        "    \"input_bq_table\": INPUT_TABLE_URI,\n",
        "    \"output_bq_table\": OUTPUT_TABLE_URI,\n",
        "    \"model_id\": MODEL_ID,\n",
        "    \"sample_size\": 10,\n",
        "    \"project\": PROJECT_ID,\n",
        "    \"location\": LOCATION,\n",
        "    \"recipients\": RECIPIENTS,\n",
        "}\n",
        "\n",
        "job = aiplatform.PipelineJob(\n",
        "    display_name=\"census-demo-pipeline\",\n",
        "    parameter_values=parameter_values,\n",
        "    template_path=\"pipeline.yaml\",\n",
        "    pipeline_root=PIPELINE_ROOT,\n",
        ")\n",
        "\n",
        "job.run()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2a4e033321ad"
      },
      "source": [
        "## Cleaning up\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "jsUHLYv4BfRJ"
      },
      "outputs": [],
      "source": [
        "delete_pipeline_job = True\n",
        "delete_bigquery_dataset = True\n",
        "delete_bucket = True\n",
        "\n",
        "if delete_pipeline_job:\n",
        "    job.delete()\n",
        "\n",
        "# Delete the Cloud Storage bucket\n",
        "if delete_bucket:\n",
        "    ! gsutil -m rm -r {BUCKET_URI}\n",
        "\n",
        "# delete dataset\n",
        "if delete_bigquery_dataset:\n",
        "    ! bq rm -r -f -d {PROJECT_ID}:{BQ_DATASET}"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "name": "monitor_batch_prediction_gemini_api.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
